Working With Topics
Starting with DXAPI version 5.5.42
, full support for working with Topics has been introduced.
This includes the ability to write data into topics and read data from them.
tip
Additional information about Topics can be found here
Working with topics in the C++ dxapi
is closely tied to the topic schema.
Therefore, it is highly recommended to use the Solution Generator
to generate your code for reading and writing,
ensuring compatibility with the topic's structure.
tip
Additional information about Solution Generator can be found here
Before proceeding with examples of reading and writing, ensure you have created a topic and generated a project capable of interacting with topics.
Creating a Topic
To create a topic, use Create Topic statement in DDL. For example:
CREATE TOPIC IF NOT EXISTS "trades-topic" (
ENUM "deltix.timebase.api.messages.AggressorSide" 'Aggressor Side' (
"BUY" = 0,
"SELL" = 1
);
ENUM "deltix.timebase.api.messages.MarketEventType" 'Market Event Type' (
"BID" = 0,
"OFFER" = 1,
"TRADE" = 2,
"INDEX_VALUE" = 3,
"OPENING_PRICE" = 4,
"CLOSING_PRICE" = 5,
"SETTLEMENT_PRICE" = 6,
"TRADING_SESSION_HIGH_PRICE" = 7,
"TRADING_SESSION_LOW_PRICE" = 8,
"TRADING_SESSION_VWAP_PRICE" = 9,
"IMBALANCE" = 10,
"TRADE_VOLUME" = 11,
"OPEN_INTEREST" = 12,
"COMPOSITE_UNDERLYING_PRICE" = 13,
"SIMULATED_SELL_PRICE" = 14,
"SIMULATED_BUY_PRICE" = 15,
"MARGIN_RATE" = 16,
"MID_PRICE" = 17,
"EMPTY_BOOK" = 18,
"SETTLE_HIGH_PRICE" = 19,
"SETTLE_LOW_PRICE" = 20,
"PRIOR_SETTLE_PRICE" = 21,
"SESSION_HIGH_BID" = 22,
"SESSION_LOW_OFFER" = 23,
"EARLY_PRICE" = 24,
"AUCTION_CLEARING_PRICE" = 25,
"SWAP_VALUE_FACTOR" = 26,
"VALUE_ADJ_LONG" = 27,
"CUMMULATIVE_VALUE_ADJ_LONG" = 28,
"DAILY_VALUE_ADJ_SHORT" = 29,
"CUMMULATIVE_VALUE_ADJ_SHORT" = 30,
"FIXING_PRICE" = 31,
"CASH_RATE" = 32,
"RECOVERY_RATE" = 33,
"RECOVERY_RATE_LONG" = 34,
"RECOVERY_RATE_SHORT" = 35
);
CLASS "deltix.timebase.api.messages.MarketMessage" (
"sequenceNumber" 'Sequence Number' INTEGER
)
AUXILIARY
NOT INSTANTIABLE;
CLASS "deltix.timebase.api.messages.TradeMessage" UNDER "deltix.timebase.api.messages.MarketMessage" (
"exchangeId" 'Exchange Code' VARCHAR ALPHANUMERIC (10),
"price" 'Price' FLOAT DECIMAL,
"size" 'Size' FLOAT DECIMAL,
"condition" 'Trade Condition' VARCHAR,
"aggressorSide" 'Aggressor Side' "deltix.timebase.api.messages.AggressorSide",
"netPriceChange" 'Net Price Change' FLOAT DECIMAL,
"eventType" 'Event Type' "deltix.timebase.api.messages.MarketEventType"
);
)
OPTIONS(COPY_STREAM = 'trades-topic-copy')
Generating a Project
To create a project, use the Solution Generator as shown below:
./solgen.sh -cpp -cpp.project-type vs2015 -cpp.WriteTopic -cpp.ReadTopic cpp.project.root /path/to/sample timebase.topic trades-topic timebase.url dxtick://localhost:8011
Upon execution, the Solution Generator will create a new C++ solution in the specified folder (/path/to/sample
in this example).
The generated solution can be opened and compiled directly within the Visual Studio IDE.
Writing to a Topic
Below is an example of writing to a topic, generated by the Solution Generator:
void writetopic() {
// Create TimeBase connection
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
try {
// Open in read-write mode
db->open(false);
TopicDB & topicDB = db->getTopicDB();
// Create topic publisher
PublishingOptions opt;
std::string key = "trades-topic";
unique_ptr<TickDirectLoader> loader(topicDB.createPublisher(key, opt));
dx_trades_topic::Dx_Trades_topicTopicEncoder encoder(loader.get());
// Message for writing
dx_trades_topic::deltix::timebase::api::messages::Dx_TradeMessage dx_trademessage;
// Create instruments
auto instrument = loader->getInstrumentId(InstrumentIdentity(DxApi::InstrumentType::EQUITY, "SYMBOL"));
for (int sent = 0; sent < 10; ++sent) {
// Write fields of deltix.timebase.api.messages.TradeMessage:
dx_trademessage.clear();
dx_trademessage.timestamp = now_ns();
dx_trademessage.typeId = dx_trades_topic::Dx_TradeMessageType;
dx_trademessage.entityId = instrument;
dx_trademessage.setSequenceNumber(1);
dx_trademessage.setExchangeId("NY4");
dx_trademessage.setPrice(1.5);
dx_trademessage.setSize(1.5);
dx_trademessage.setNetPriceChange(1.5);
dx_trademessage.setAggressorSide(dx_trades_topic::deltix::timebase::api::messages::Dx_AggressorSideEnum::BUY);
dx_trademessage.setCondition("Any string");
dx_trademessage.setEventType(dx_trades_topic::deltix::timebase::api::messages::Dx_MarketEventTypeEnum::BID);
encoder.send(dx_trademessage);
cout << "[SENT]: " << dx_trademessage.toString() << endl;
}
// Close loader after send
loader->close();
} catch (exception &e) {
cout << e.what() << endl;
} catch (...) {
cout << "System exception" << endl;
}
db->close();
}
Reading from a Topic
Below is an example of reading data from a topic, generated by the Solution Generator:
void readtopic() {
// Create TimeBase connection
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
try {
// Open TimeBase connection
db->open(false);
TopicDB & topicDB = db->getTopicDB();
// Create topic poller
ConsumerOptions opt;
unique_ptr<TickMessagePoller> messagePoller(topicDB.createPollingConsumer("trades-topic", opt));
dx_trades_topic::Dx_Trades_topicTopicDecoder decoder(messagePoller.get());
int read = 0;
MessageProcessor messageProcessor([&decoder](const InstrumentMessage& header) {
NativeMessage* message = decoder.decode(header);
cout << "[MSG]: " << message->toString() << endl;
});
while (!messagePoller->isAtEnd() && read < 10) {
read += messagePoller->processMessages(10, messageProcessor);
}
cout << "Read " << read << " messages" << endl;
// Close messagePoller after reading
messagePoller->close();
} catch (exception &e) {
cout << e.what() << endl;
} catch (...) {
cout << "System exception" << endl;
}
db->close();
}